package com.hys.app.service.datasync.receive.common;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.hys.app.converter.datasync.MessageReceiveConverter;
import com.hys.app.framework.database.WebPage;
import com.hys.app.framework.database.mybatisplus.base.BaseServiceImpl;
import com.hys.app.framework.exception.ServiceException;
import com.hys.app.framework.rabbitmq.MessageSender;
import com.hys.app.framework.rabbitmq.MqMessage;
import com.hys.app.framework.util.DateUtil;
import com.hys.app.framework.util.SqlUtil;
import com.hys.app.framework.util.StringUtil;
import com.hys.app.mapper.datasync.MessageReceiveMapper;
import com.hys.app.model.base.rabbitmq.AmqpExchange;
import com.hys.app.model.datasync.dos.MessageReceiveDO;
import com.hys.app.model.datasync.dto.MessageReceiveDTO;
import com.hys.app.model.datasync.dto.MessageReceiveQueryParams;
import com.hys.app.model.base.Result;
import com.hys.app.model.datasync.enums.MessageReceiveStatusEnum;
import com.hys.app.model.datasync.enums.MessageReceiveTypeEnum;
import com.hys.app.model.datasync.vo.MessageReceiveVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * 消息接收业务层实现
 *
 * @author 张崧
 * @since 2023-12-19 17:41:37
 */
@Service
@Slf4j
public class MessageReceiveManagerImpl extends BaseServiceImpl<MessageReceiveMapper, MessageReceiveDO> implements MessageReceiveManager {

    @Autowired
    private MessageReceiveConverter converter;

    @Autowired
    private MessageSender messageSender;

    @Autowired(required = false)
    private List<MessageReceiveExecutor> executorList;

    private Map<MessageReceiveTypeEnum, MessageReceiveExecutor> executorMap;

    @PostConstruct
    private void init() {
        executorMap = executorList == null ? Collections.emptyMap() :
                executorList.stream().collect(Collectors.toMap(MessageReceiveExecutor::getType, Function.identity()));
    }

    @Override
    public WebPage<MessageReceiveVO> list(MessageReceiveQueryParams queryParams) {
        WebPage<MessageReceiveDO> webPage = baseMapper.selectPage(queryParams);
        return converter.convert(webPage);
    }

    @Override
    public MessageReceiveVO getDetail(Long id) {
        return converter.convert(getById(id));
    }

    @Override
    public void receive(MessageReceiveDTO messageReceiveDTO) {
        MessageReceiveDO messageReceiveDO = BeanUtil.copyProperties(messageReceiveDTO, MessageReceiveDO.class);
        messageReceiveDO.setReceiveTime(DateUtil.getDateline());
        messageReceiveDO.setStatus(MessageReceiveStatusEnum.Wait);

        try {
            // 先消息入库，msgId字段使用唯一索引实现消息幂等
            super.save(messageReceiveDO);
        } catch (Exception e) {
            // 如果已存在该消息，则跳过
            if (SqlUtil.isUniqueException(e)) {
                log.warn("接收到重复消息：" + messageReceiveDO.getMsgId());
                return;
            }
            throw e;
        }

        // 异步处理
        messageSender.send(new MqMessage(AmqpExchange.MESSAGE_RECEIVE, "", messageReceiveDO.getId()));
    }

    @Override
    public void handle(Long id) {
        MessageReceiveDO messageReceiveDO = super.getById(id);
        if (messageReceiveDO.getStatus() == MessageReceiveStatusEnum.Success) {
            throw new ServiceException("处理成功的消息不能重复执行");
        }

        MessageReceiveExecutor messageExecutor = executorMap.get(messageReceiveDO.getType());
        if (messageExecutor == null) {
            throw new ServiceException(StrUtil.format("未找到消息执行器：{}", messageReceiveDO.getType()));
        }

        try {
            Result executeResult = messageExecutor.execute(messageReceiveDO.getContent());
            MessageReceiveStatusEnum status = executeResult.getSuccess() ? MessageReceiveStatusEnum.Success : MessageReceiveStatusEnum.Fail;
            baseMapper.updateStatus(messageReceiveDO.getId(), status, executeResult.getRemark());
        } catch (Exception e) {
            log.error("消息执行失败，id：" + messageReceiveDO.getId(), e);
            baseMapper.updateStatus(messageReceiveDO.getId(), MessageReceiveStatusEnum.Fail, StringUtil.formatException(e));
        }
    }

}

